[AWS CDK] S3サーバーアクセスログとCloudFrontアクセスログをAthenaで分析する環境を作ってみた

[AWS CDK] S3サーバーアクセスログとCloudFrontアクセスログをAthenaで分析する環境を作ってみた

簡単にログ分析環境を用意できます
Clock Icon2024.04.02

S3サーバーアクセスログとCloudFrontアクセスログをAthenaで分析する環境が欲しい

こんにちは、のんピ(@non____97)です。

皆さんはS3サーバーアクセスログとCloudFrontアクセスログをAthenaで分析する環境が欲しいなと思ったことはありますか? 私はあります。

S3サーバーアクセスログは以下記事で紹介しているとおり、日付ベースのオブジェクトキー形式の出力をサポートしています。これにより、Partiton Projectionなどを用いたAthenaのスキャン範囲の絞り込みがしやすくなりました。

一方、CloudFrontアクセスログは2024/4/1時点で日付ベースのオブジェクトキーはサポートしていません。そのため、以下記事で紹介しているようにアクセスログが出力されたことをトリガーにアクセスログをリネームする(CopyObjectDeleteObject)を行う必要があります。これは手間です。

以前、AWS CDKでCloudFrontとS3を使ったWebサイトを構築してみました。

これに付け足す形でS3サーバーアクセスログとCloudFrontアクセスログをAthenaで分析する環境を作ってみます。

AWS CDKのコードの紹介

やっていること

AWS CDKのコードは以下リポジトリに保存しています。

前回に追加して、やっていることは以下のとおりです。

  • Athenaのワークスペース および、クエリ結果の出力先S3バケットの作成 (Optional)
  • Glueデータベースの作成 (Optional)
  • S3サーバーアクセスログ用のGlueテーブルの作成 (Optional)
  • CloudFrontアクセスログ用のGlueテーブルの作成 (Optional)
  • CloudFrontアクセスログをリネームするLambda関数とEventBridge Ruleの作成 (Optional)

上述の処理を実行するかの判断はlogAnalyticsというプロパティで行います。

import * as cdk from "aws-cdk-lib";
import * as path from "path";

export type LogType = "s3ServerAccessLog" | "cloudFrontAccessLog";
.
.
(中略)
.
.
export interface LogAnalytics {
  createWorkGroup?: boolean;
  enableLogAnalytics?: LogType[];
}
.
.
(中略)
.
.
export interface WebsiteProperty {
  hostedZone?: HostZoneProperty;
  certificate?: CertificateProperty;
  contentsDelivery?: ContentsDeliveryProperty;
  allowDeleteBucketAndObjects?: boolean;
  s3ServerAccessLog?: AccessLog;
  cloudFrontAccessLog?: AccessLog;
  logAnalytics?: LogAnalytics;
}

Athenaで使用するDBとテーブルの作成

Athenaで使用するGlueのDBとテーブルは専用のConstructを用意し、そこで行います。1つのConstructにまとめて関数を呼び出す形でそれぞれ作成します。おとなしく専用のConsrtuctを作成した方が分かりやすいような気もしています。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import { LogAnalytics, LogType } from "../../parameter/index";
import { BucketConstruct } from "./bucket-construct";

// ログの種類ごとのロケーションやテーブル定義の情報
interface LogTable {
  location: string;
  storageLocationTemplate?: string;
  tableInput: cdk.aws_glue.CfnTable.TableInputProperty;
}

type LogTables = {
  [key in LogType]: LogTable;
};

// Glue DBを作成する関数のプロパティ
interface CreateDatabaseProperty {
  databaseName: string;
}

// Glue Tableを作成する関数のプロパティ
interface CreateTableProperty {
  databaseName: string;
  logType: LogType;
  locationPlaceHolder: {
    logBucketName: string;
    logSrcResourceAccountId: string;
    logSrcResourceId?: string;
    logSrcResourceRegion?: string;
    prefix?: string;
  };
}

// 関数ごとのテーブル定義
const s3ServerAccessLog: LogTable = {
  location:
    "s3://#{logBucketName}/#{prefix}#{logSrcResourceAccountId}/#{logSrcResourceRegion}/#{logSrcResourceId}",
  storageLocationTemplate: "#{location}/${date}",
  tableInput: {
    name: "s3_server_access_log",
    tableType: "EXTERNAL_TABLE",
    storageDescriptor: {
      columns: [
        { name: "bucketowner", type: "string" },
        { name: "bucket_name", type: "string" },
        { name: "requestdatetime", type: "string" },
        { name: "remoteip", type: "string" },
        { name: "requester", type: "string" },
        { name: "requestid", type: "string" },
        { name: "operation", type: "string" },
        { name: "key", type: "string" },
        { name: "request_uri", type: "string" },
        { name: "httpstatus", type: "string" },
        { name: "errorcode", type: "string" },
        { name: "bytessent", type: "bigint" },
        { name: "objectsize", type: "bigint" },
        { name: "totaltime", type: "string" },
        { name: "turnaroundtime", type: "string" },
        { name: "referrer", type: "string" },
        { name: "useragent", type: "string" },
        { name: "versionid", type: "string" },
        { name: "hostid", type: "string" },
        { name: "sigv", type: "string" },
        { name: "ciphersuite", type: "string" },
        { name: "authtype", type: "string" },
        { name: "endpoint", type: "string" },
        { name: "tlsversion", type: "string" },
        { name: "accesspointarn", type: "string" },
        { name: "aclrequired", type: "string" },
      ],
      inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
      outputFormat:
        "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
      serdeInfo: {
        serializationLibrary: "org.apache.hadoop.hive.serde2.RegexSerDe",
        parameters: {
          "input.regex":
            '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ("[^"]*"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ("[^"]*"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$',
        },
      },
    },
    parameters: {
      has_encrypted_data: true,
      "projection.enabled": true,
      "projection.date.type": "date",
      "projection.date.interval": "1",
      "projection.date.interval.unit": "DAYS",
      "projection.date.range": "NOW-1YEARS, NOW+9HOUR",
      "projection.date.format": "yyyy/MM/dd",
    },
    partitionKeys: [{ name: "date", type: "string" }],
  },
};

const cloudFrontAccessLog: LogTable = {
  location:
    "s3://#{logBucketName}/#{prefix}partitioned/#{logSrcResourceAccountId}/#{logSrcResourceId}",
  storageLocationTemplate: "#{location}/${date}",
  tableInput: {
    name: "cloudfront_access_log",
    tableType: "EXTERNAL_TABLE",
    storageDescriptor: {
      columns: [
        { name: "log_date", type: "date" },
        { name: "time", type: "string" },
        { name: "x_edge_location", type: "string" },
        { name: "sc_bytes", type: "bigint" },
        { name: "c_ip", type: "string" },
        { name: "cs_method", type: "string" },
        { name: "cs_host", type: "string" },
        { name: "cs_uri_stem", type: "string" },
        { name: "sc_status", type: "int" },
        { name: "cs_referer", type: "string" },
        { name: "cs_user_agent", type: "string" },
        { name: "cs_uri_query", type: "string" },
        { name: "cs_cookie", type: "string" },
        { name: "x_edge_result_type", type: "string" },
        { name: "x_edge_request_id", type: "string" },
        { name: "x_host_header", type: "string" },
        { name: "cs_protocol", type: "string" },
        { name: "cs_bytes", type: "bigint" },
        { name: "time_taken", type: "float" },
        { name: "x_forwarded_for", type: "string" },
        { name: "ssl_protocol", type: "string" },
        { name: "ssl_cipher", type: "string" },
        { name: "x_edge_response_result_type", type: "string" },
        { name: "cs_protocol_version", type: "string" },
        { name: "fle_status", type: "string" },
        { name: "fle_encrypted_fields", type: "string" },
        { name: "c_port", type: "int" },
        { name: "time_to_first_byte", type: "float" },
        { name: "x_edge_detailed_result_type", type: "string" },
        { name: "sc_content_type", type: "string" },
        { name: "sc_content_len", type: "bigint" },
        { name: "sc_range_start", type: "bigint" },
        { name: "sc_range_end", type: "bigint" },
      ],
      inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
      outputFormat:
        "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
      serdeInfo: {
        serializationLibrary:
          "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
        parameters: {
          "field.delim": "\t",
          "serialization.format": "\t",
        },
      },
    },
    parameters: {
      has_encrypted_data: true,
      "skip.header.line.count": "2",
      "projection.enabled": true,
      "projection.date.type": "date",
      "projection.date.interval": "1",
      "projection.date.interval.unit": "DAYS",
      "projection.date.range": "NOW-1YEARS, NOW+9HOUR",
      "projection.date.format": "yyyy/MM/dd",
    },
    partitionKeys: [{ name: "date", type: "string" }],
  },
};

const logTables: LogTables = {
  s3ServerAccessLog,
  cloudFrontAccessLog,
};

export interface LogAnalyticsConstructProps extends LogAnalytics {
  queryOutputBucketConstruct?: BucketConstruct;
}

export class LogAnalyticsConstruct extends Construct {
  constructor(scope: Construct, id: string, props: LogAnalyticsConstructProps) {
    super(scope, id);

    if (!props.queryOutputBucketConstruct) {
      return;
    }

    new cdk.aws_athena.CfnWorkGroup(this, "WorkGroup", {
      name: `workgroup-log-analytics-${cdk.Lazy.string({
        produce: () => cdk.Names.uniqueId(this),
      })}`,
      recursiveDeleteOption: true,
      state: "ENABLED",
      workGroupConfiguration: {
        bytesScannedCutoffPerQuery: 1073741824,
        enforceWorkGroupConfiguration: false,
        publishCloudWatchMetricsEnabled: true,
        requesterPaysEnabled: false,
        resultConfiguration: {
          outputLocation:
            props.queryOutputBucketConstruct.bucket.s3UrlForObject(),
        },
      },
    });
  }

  // Glue DBの作成
  public createDatabase = (
    id: string,
    props: CreateDatabaseProperty
  ): cdk.aws_glue.CfnDatabase => {
    return new cdk.aws_glue.CfnDatabase(this, id, {
      catalogId: cdk.Stack.of(this).account,
      databaseInput: {
        name: props.databaseName,
      },
    });
  };

  // Glue Tableの作成
  public createTable = (id: string, props: CreateTableProperty) => {
    // プレフィックスの整形
    props.locationPlaceHolder.prefix = props.locationPlaceHolder.prefix
      ? `${props.locationPlaceHolder.prefix}/`
      : "";

    const logTable = logTables[props.logType];
    const tableInput = logTable.tableInput;

    // 定義したロケーションのプレースホルダーを受け取ったプロパティで置換
    const location = this.replacePlaceholders(
      logTable.location,
      props.locationPlaceHolder
    );
    const storageLocationTemplate = logTable.storageLocationTemplate?.replace(
      "#{location}",
      location
    );

    // テーブル定義と置換したロケーション、Partiton projection用のロケーションを合成して1つのオブジェクトに
    const mergedTableInput: cdk.aws_glue.CfnTable.TableInputProperty = {
      ...tableInput,
      storageDescriptor: {
        ...tableInput.storageDescriptor,
        location,
      },
      parameters: {
        ...tableInput.parameters,
        "storage.location.template": storageLocationTemplate,
      },
    };

    new cdk.aws_glue.CfnTable(this, id, {
      databaseName: props.databaseName,
      catalogId: cdk.Stack.of(this).account,
      tableInput: mergedTableInput,
    });
  };

  // 指定した文字列内からプレースホルダーを取得
  private getPlaceholders = (template: string): string[] => {
    const placeholderRegex = /#{([^}]+)}/g;
    const placeholders: string[] = [];
    let match;

    while ((match = placeholderRegex.exec(template)) !== null) {
      placeholders.push(match[1]);
    }
    return placeholders;
  };

  // プレースホルダーの置換
  private replacePlaceholders = (
    template: string,
    props: Record<string, string>
  ): string => {
    const placeholders = this.getPlaceholders(template);
    let result = template;

    for (const placeholder of placeholders) {
      if (props.hasOwnProperty(placeholder)) {
        result = result.replace(
          new RegExp(`#{${placeholder}}`, "g"),
          props[placeholder]
        );
      } else {
        throw new Error(`Placeholder not replaced: #{${placeholder}}`);
      }
    }

    return result;
  };
}

CloudFrontアクセスログのリネーム

CloudFrontアクセスログのリネームは以下AWS Blogで使用しているLambda関数が参考になります。

変更箇所は以下のとおりです。

  • TypeScriptで記述
  • S3イベント通知ではなく、EventBridgeのイベントを受け取るように変更
  • Apache Hive互換のプレフィックスもしくはyyyy/MM/dd/HHのプレフィックスのどちらか選択できるように変更
  • オブジェクトのコピーや削除時にエラーが発生した場合に例外を投げるように変更
// Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
import {
  S3Client,
  CopyObjectCommand,
  DeleteObjectCommand,
} from "@aws-sdk/client-s3";
import { EventBridgeEvent } from "aws-lambda";

interface EventObjectCreated {
  version: string;
  bucket: {
    name: string;
  };
  object: {
    key: string;
    size: number;
    etag: string;
    "version-id": string;
    sequencer: string;
  };
  "request-id": string;
  requester: string;
  "source-ip-address": string;
  reason: "PutObject";
}

const s3 = new S3Client();

// prefix to copy partitioned data to w/o leading but w/ trailing slash
const targetKeyPrefix = process.env.TARGET_KEY_PREFIX;

const hiveCompatiblePartitions = process.env.HIVE_COMPATIBLE_PARTITIONS;

// regex for filenames by Amazon CloudFront access logs. Groups:
// - 1.	year
// - 2.	month
// - 3.	day
// - 4.	hour
const datePattern = "[^\\d](\\d{4})-(\\d{2})-(\\d{2})-(\\d{2})[^\\d]";
const filenamePattern = "[^/]+$";

export const handler = async (
  event: EventBridgeEvent<"Object Created", EventObjectCreated>
) => {
  const bucket = event.detail.bucket.name;
  const sourceKey = event.detail.object.key;

  const sourceRegex = new RegExp(datePattern, "g");
  const match = sourceRegex.exec(sourceKey);

  if (match == null) {
    console.log(
      `Object key ${sourceKey} does not look like an access log file, so it will not be moved.`
    );
    return;
  }

  const [, year, month, day, hour] = match;

  const filenameRegex = new RegExp(filenamePattern, "g");
  const filenameRegExpExecArray = filenameRegex.exec(sourceKey);

  if (filenameRegExpExecArray === null) {
    console.log(
      `Object key ${sourceKey} does not look like an access log file, so it will not be moved.`
    );
    return;
  }

  const filename = filenameRegExpExecArray[0];

  const targetKey =
    hiveCompatiblePartitions === "true"
      ? `${targetKeyPrefix}year=${year}/month=${month}/day=${day}/hour=${hour}/${filename}`
      : `${targetKeyPrefix}${year}/${month}/${day}/${hour}/${filename}`;

  console.log(`Copying ${sourceKey} to ${targetKey}.`);

  const copyParams = {
    CopySource: `${bucket}/${sourceKey}`,
    Bucket: bucket,
    Key: targetKey,
  };

  try {
    await s3.send(new CopyObjectCommand(copyParams));
  } catch (e) {
    Error(`Error while copying ${sourceKey}: ${e}`);
  }

  console.log(`Copied. Now deleting ${sourceKey}.`);

  const deleteParams = { Bucket: bucket, Key: sourceKey };

  try {
    await s3.send(new DeleteObjectCommand(deleteParams));
  } catch (e) {
    throw new Error(`Error while deleting ${sourceKey}: ${e}`);
  }
  console.log(`Deleted ${sourceKey}.`);
};

CloudFrontアクセスログのログ分析を行う場合はCloudFrontのConstructで、Lambda関数の作成やアクセスログ出力先S3バケットのEventBridge通知の有効化、EventBridge Ruleの作成を行っています。

EventBridge Ruleでは「アクセスログをリネームしたことをトリガーにLambda関数が再起的に実行され、無限ループになる」ということを防ぐためにanything-butでリネーム先のプレフィックスのオブジェクトの作成ではトリガーしないようにしています。

if (
      props.cloudFrontAccessLogBucketConstruct &&
      props.enableLogAnalytics?.find((enableLogAnalytics) => {
        return enableLogAnalytics === "cloudFrontAccessLog";
      })
    ) {
      const targetKeyPrefix = props.logFilePrefix
        ? `${props.logFilePrefix}/partitioned/${cdk.Stack.of(this).account}/${
            this.distribution.distributionId
          }/`
        : `partitioned/${cdk.Stack.of(this).account}/${
            this.distribution.distributionId
          }/`;

      const moveCloudFrontAccessLogLambda =
        new cdk.aws_lambda_nodejs.NodejsFunction(
          this,
          "MoveCloudFrontAccessLogLambda",
          {
            entry: path.join(
              __dirname,
              "../src/lambda/move-cloudfront-access-log/index.ts"
            ),
            runtime: cdk.aws_lambda.Runtime.NODEJS_20_X,
            bundling: {
              minify: true,
              tsconfig: path.join(__dirname, "../src/lambda/tsconfig.json"),
              format: cdk.aws_lambda_nodejs.OutputFormat.ESM,
            },
            architecture: cdk.aws_lambda.Architecture.ARM_64,
            environment: {
              TARGET_KEY_PREFIX: targetKeyPrefix,
              HIVE_COMPATIBLE_PARTITIONS: "false",
            },
          }
        );

      props.cloudFrontAccessLogBucketConstruct.bucket.enableEventBridgeNotification();
      props.cloudFrontAccessLogBucketConstruct.bucket.grantReadWrite(
        moveCloudFrontAccessLogLambda
      );
      props.cloudFrontAccessLogBucketConstruct.bucket.grantDelete(
        moveCloudFrontAccessLogLambda
      );

      new cdk.aws_events.Rule(this, "CloudFrontAccessLogCreatedEventRule", {
        eventPattern: {
          source: ["aws.s3"],
          resources: [
            props.cloudFrontAccessLogBucketConstruct.bucket.bucketArn,
          ],
          detailType: ["Object Created"],
          detail: {
            object: {
              key: [
                {
                  "anything-but": {
                    prefix: targetKeyPrefix,
                  },
                },
              ],
            },
          },
        },
        targets: [
          new cdk.aws_events_targets.LambdaFunction(
            moveCloudFrontAccessLogLambda
          ),
        ],
      });
    }

やってみた

ログが出力されることを確認

実際にやってみます。

パラメーターは以下のとおりです。

export const websiteStackProperty: WebSiteStackProperty = {
  env: {
    account: process.env.CDK_DEFAULT_ACCOUNT,
    region: process.env.CDK_DEFAULT_REGION,
  },
  props: {
    contentsDelivery: {
      contentsPath: path.join(__dirname, "../lib/src/contents"),
      enableS3ListBucket: true,
    },
    allowDeleteBucketAndObjects: true,
    s3ServerAccessLog: {
      enableAccessLog: true,
      logFilePrefix: "s3_server_access_log",
      lifecycleRules: [{ expirationDays: 365 }],
    },
    cloudFrontAccessLog: {
      enableAccessLog: true,
      logFilePrefix: "cloudfront",
      lifecycleRules: [{ expirationDays: 365 }],
    },
    logAnalytics: {
      createWorkGroup: true,
      enableLogAnalytics: ["s3ServerAccessLog", "cloudFrontAccessLog"],
    },
  },
};

デプロイ後、CloudFrontディストリビューションのDNS名に何回かアクセスします。

それぞれログ出力されていることを確認します。

  • S3サーバーアクセスログ

S3サーバーアクセスログ

  • CloudFrontアクセスログ

CloudFrontアクセスログ

どちらも問題なく出力されています。

また、CloudFrontアクセスログをリネームするLambda関数のログを確認すると、確かに指定した箇所にリネームしていることが分かります。

2024-04-02T17:36:24.261+09:00	INIT_START Runtime Version: nodejs:20.v19 Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:2d68d583b872accb8815f845161e9bbabc6ac3ed4d0ab8c23e75f8788cf5444a
2024-04-02T17:36:24.713+09:00	START RequestId: a2f091f7-37c6-4da9-b0eb-f0f89657a869 Version: $LATEST
2024-04-02T17:36:24.715+09:00	2024-04-02T08:36:24.715Z a2f091f7-37c6-4da9-b0eb-f0f89657a869 INFO Copying cloudfront/E2WKNYESFUKC6K.2024-04-02-08.cf9f3bbf.gz to cloudfront/partitioned/<AWSアカウントID>/E2WKNYESFUKC6K/2024/04/02/08/E2WKNYESFUKC6K.2024-04-02-08.cf9f3bbf.gz.
2024-04-02T17:36:25.789+09:00	2024-04-02T08:36:25.789Z a2f091f7-37c6-4da9-b0eb-f0f89657a869 INFO Copied. Now deleting cloudfront/E2WKNYESFUKC6K.2024-04-02-08.cf9f3bbf.gz.
2024-04-02T17:36:26.001+09:00	2024-04-02T08:36:26.001Z a2f091f7-37c6-4da9-b0eb-f0f89657a869 INFO Deleted cloudfront/E2WKNYESFUKC6K.2024-04-02-08.cf9f3bbf.gz.
2024-04-02T17:36:26.041+09:00	END RequestId: a2f091f7-37c6-4da9-b0eb-f0f89657a869
2024-04-02T17:36:26.041+09:00	REPORT RequestId: a2f091f7-37c6-4da9-b0eb-f0f89657a869 Duration: 1327.97 ms Billed Duration: 1328 ms Memory Size: 128 MB Max Memory Used: 93 MB Init Duration: 450.94 ms

Athenaを使ったクエリの実行

実際にAthenaを使ったクエリを叩いてみましょう。

まずはS3サーバーアクセスログです。本日分の10件のログを出力してみます。

SELECT
    * 
FROM
    "access_log"."s3_server_access_log" 
WHERE
    date = '2024/04/02'
LIMIT
    10

実行結果は以下のとおりです。Trusted AdvisorのスキャンなどCloudFront以外のアクセスがそれなりに多いですね。

S3サーバーアクセスログのクエリ結果

CloudFrontからのアクセスに絞ってみます。

SELECT
    * 
FROM
    "access_log"."s3_server_access_log" 
WHERE
    date = '2024/04/02' AND
    requester = 'svc:cloudfront.amazonaws.com'
LIMIT
    10

実行結果は以下のとおりです。CloudFrontからアクセスしたもののみになりました。

S3サーバーアクセスログのクエリ結果2

続いて、CloudFrontアクセスログです。

SELECT
    * 
FROM
    "access_log"."cloudfront_access_log" 
WHERE
    date = '2024/04/02'
LIMIT
    20

実行結果は以下のとおりです。問題なくログの表示ができました。

CloudFrontアクセスログのクエリ結果

それぞれのログのフィールドについては以下AWS公式ドキュメントをご覧ください。

簡単にログ分析環境を用意できます

AWS CDKを使ってS3サーバーアクセスログとCloudFrontアクセスログをAthenaで分析する環境を作ってみました。

この仕組みを使用すればVPC Flow LogsやALBのアクセスログの分析環境も簡単に用意できそうです。

この記事が誰かの助けになれば幸いです。

以上、AWS事業本部 コンサルティング部の のんピ(@non____97)でした!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.